package day04;

import beans.SensorReading;
import day03.window.FlinkWindow00;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 检测温度跳变，两次温度差值大于10，输出报警
 *
 * @author lvbingbing
 * @date 2022-01-05 23:00
 */
public class FlinkState03 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、键控状态的使用
        temperatureWarning(flinkWindow.getSingleOutputStreamOperator());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 检测温度跳变，两次温度差值大于10，输出报警
     *
     * @param sensorReadingStream <br>
     */
    private static void temperatureWarning(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        sensorReadingStream.keyBy("id")
                .flatMap(new WarningTemperature(10.0))
                .print("温度跳变告警日志");
    }

    private static class WarningTemperature extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {

        private static final long serialVersionUID = -144267249586740514L;

        /**
         * 定义状态，保存上一次的温度值
         */
        private transient ValueState<Double> valueState;

        /**
         * 温度跳变阈值
         */
        private final Double threshold;

        public WarningTemperature(Double threshold) {
            this.threshold = threshold;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            valueState = getRuntimeContext().getState(new ValueStateDescriptor<>("myValueState", Double.class));
        }

        @Override
        public void flatMap(SensorReading sensorReading, Collector<Tuple3<String, Double, Double>> out) throws Exception {
            // 获取状态
            Double lastTemperature = valueState.value();
            Double currentTemperature = sensorReading.getTemperature();
            // 如果状态不为null，那么判断两次温度差值
            if (lastTemperature != null) {
                double diff = Math.abs(lastTemperature - currentTemperature);
                if (diff > threshold) {
                    out.collect(new Tuple3<>(sensorReading.getId(), lastTemperature, currentTemperature));
                }
            }
            // 更新状态
            valueState.update(currentTemperature);
        }

        @Override
        public void close() throws Exception {
            valueState.clear();
        }
    }
}